package cool.houge.ws.impl;

import com.typesafe.config.Config;
import cool.houge.ws.ConfigKeys;
import cool.houge.ws.PacketHandler;
import cool.houge.ws.packet.Packet;
import cool.houge.ws.session.Session;
import cool.houge.ws.session.SessionManager;
import io.avaje.inject.Component;
import io.avaje.jsonb.Jsonb;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

import java.util.List;

import static java.util.Objects.requireNonNull;

/**
 * @author ZY (kzou227@qq.com)
 */
@Component
public class KafkaPacketProcessor implements PacketHandler, AutoCloseable {

    private static final Logger log = LogManager.getLogger(KafkaPacketProcessor.class);

    private static final String TOPIC_PROCESS = "messenger-process";
    private static final String TOPIC_TRANSMIT = "messenger-transmit";

    private final Jsonb jsonb;
    private final SessionManager sessionManager;
    private final KafkaSender<String, byte[]> kafkaSender;
    private final KafkaReceiver<String, byte[]> kafkaReceiver;
    private final Disposable receiverDisposable;

    public KafkaPacketProcessor(Config config, Jsonb jsonb, SessionManager sessionManager) {
        requireNonNull(config);
        requireNonNull(jsonb);

        this.jsonb = jsonb;
        this.sessionManager = sessionManager;

        this.kafkaSender = KafkaSender.create(newSenderOptions(config));
        this.kafkaReceiver = KafkaReceiver.create(newReceiverOptions(config)
                .consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getString(ConfigKeys.KAFKA_GROUP_ID))
                .subscription(List.of(TOPIC_TRANSMIT)));
        this.receiverDisposable = this.kafkaReceiver
                .receive()
                .doOnNext(this::transmit)
                .onErrorComplete()
                .subscribe();
    }

    @Override
    public Mono<Void> handle(Session session, Packet packet) {
        return kafkaSender
                .send(Mono.fromCallable(() -> {
                    var bytes = jsonb.toJsonBytes(packet);
                    var record = new ProducerRecord<>(TOPIC_PROCESS, packet.getMessageId(), bytes);
                    return SenderRecord.create(record, packet.getMessageId());
                }))
                .doOnNext(rs -> {
                    log.info("消息成功发布至Kafka：messageId={}", rs.correlationMetadata());
                })
                .then();
    }

    @Override
    public void close() {
        try {
            kafkaSender.close();
        } catch (Exception e) {
            // ignore
        }

        try {
            receiverDisposable.dispose();
        } catch (Exception e) {
            // ignore
        }
    }

    // 转发消息
    void transmit(ReceiverRecord<String, byte[]> record) {
        Flux.defer(() -> {
                    var packet = jsonb.type(Packet.class).fromJson(record.value());
                    return sessionManager.findByUid(packet.getTo()).flatMap(session -> session.send(packet));
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe();
    }

    SenderOptions<String, byte[]> newSenderOptions(Config config) {
        return SenderOptions.<String, byte[]>create()
                .producerProperty(ProducerConfig.CLIENT_ID_CONFIG, config.getString(ConfigKeys.APP_NAME))
                .producerProperty(
                        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS))
                .producerProperty(ProducerConfig.ACKS_CONFIG, "1")
                .producerProperty(ProducerConfig.RETRIES_CONFIG, 2)
                .withKeySerializer(new StringSerializer())
                .withValueSerializer(new ByteArraySerializer());
    }

    ReceiverOptions<String, byte[]> newReceiverOptions(Config config) {
        return ReceiverOptions.<String, byte[]>create()
                .consumerProperty(ConsumerConfig.CLIENT_ID_CONFIG, config.getString(ConfigKeys.APP_NAME))
                .consumerProperty(
                        ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getString(ConfigKeys.KAFKA_BOOTSTRAP_SERVERS))
                .withKeyDeserializer(new StringDeserializer())
                .withValueDeserializer(new ByteArrayDeserializer());
    }
}
